{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 用`multiprocessing`和`ProcessPoolExecutor`模块\n",
    "上一章我们用`threading`模块演示了两个例子。这一章我们将介绍`multiprocessing`的用法，实现与`threading`类似的接口。但是，我们将用进程范式。\n",
    "\n",
    "本章内容包括以下主题：\n",
    "- 理解进程的概念\n",
    "- 理解多进程通信\n",
    "- 用`multiprocessing`实现多请求的Fibonacci数列\n",
    "- 用`ProcessPoolExecutor`实现并行网络爬虫\n",
    "\n",
    "<!-- TEASER_END-->"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 理解进程的概念\n",
    "在操作系统中，*进程*是程序执行和相关资源的容器。所有与程序执行有关的资源都由进程管理——程序的数据区域，子进程，运行状态，还有与其他进程之间的通信。\n",
    "\n",
    "### 理解进程模型\n",
    "进程离不开对资源和信息的控制与操作。操作系统有一个进程控制块（ Process Control Block，PCB），里面存储与进行有关的信息。例如，PCB会存储下面的信息：\n",
    "- **进程ID**：是唯一的整数值（无符号），表示进程在操作系统中的识别号\n",
    "- **程序计数器（Program Counter，PC）**：这包括下一个要被执行的程序指令的地址\n",
    "- **I/O信息**：与进程相关的打开文件和设备列表\n",
    "- **内存配置**：这里存储了进程和页表（tables of paging）使用和存储的内存空间\n",
    "- **CPU进程排序（CPU scheduling）**：这里存储了进程优先级和蹒跚队列（staggering queues）的信息\n",
    "- **优先级**：定义CPU执行的进程优先级\n",
    "- **当前状态**：表示进程状态是就绪，等待或运行\n",
    "- **CPU注册表**：存储栈指针和其他信息\n",
    "\n",
    "#### 进程状态定义\n",
    "进程的生命周期有三种状态，如下所示：\n",
    "- **运行**：进程正在使用CPU\n",
    "- **就绪**：进程在队列中等待CPU调用\n",
    "- **等待**：进行等待与任务执行相关的I/O操作完成\n",
    "\n",
    "## 实现多进程通信\n",
    "[`multiprocessing`模块](https://docs.python.org/3/library/multiprocessing.html)允许两种方式实现进程间通信，均是通过信息传递实现的。如前面章节里介绍的，信息传递范式没有同步机制缺乏，数据通过复制在进程间交换。\n",
    "### 使用`multiprocessing.Pipe`\n",
    "管道`pipe`是一种可以建立两个节点（endpoint）（两个进程）之间的通信的机制。这种创建通道（channel）的方式是为了实现进程间信息交换。\n",
    ">Python官方文档推荐使用管道实现两个节点的通信，是因为管道不能同时安全地与第三个节点进行通信。\n",
    "\n",
    "们将实现一个能够创建两个进程的Python程序（生成消费模型），来演示`multiprocessing.Pipe`对象的用法，进程A和进程B。进程A随机发送1到10范围内的整数值给进程B，进程B把收到的整数值显示到屏幕上。现在，让我们来看看源代码。\n",
    "\n",
    "首先我们导入一些需要使用的模块，如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import os\n",
    "import random\n",
    "from multiprocessing import Process, Pipe"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "这里`os`模块可以让我们通过[`os.getpid()`](https://docs.python.org/3/library/os.html)获取进程的PID。这个`os.getpid()`返回的PID可以让整个例子变得透明可见。`producer_task`和`consumer_task`函数运行时，对应进程的PID都会显示出来。\n",
    "\n",
    "之后的代码中，我们定义生产者`producer_task`函数，首先通过`random.randint(1,10)`生成一串随机数。函数的关键点是调用了`conn.send(value)`，这是用主函数里的`Pipe`生成的连接对象，将作为函数的参数使用。`producer_task`函数代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def producer_task(conn):\n",
    "    value = random.randint(1, 10)\n",
    "    conn.send(value)\n",
    "    print('Value [%d] sent by PID [%d]' % (value, os.getpid()))\n",
    "    conn.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    ">当`Pipe`对象使用`send`方法发生数据之后，不要忘了用`close()`方法。在连接的通道不再使用之后释放资源是非常重要的事情。\n",
    "\n",
    "消费者进程要做的事情很简单，就是把收到的信息打印到屏幕上，并显示进程的PID信息。为了从通信的通道中获取整数值，需要调用[`conn.recv()`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Connection.recv)。`consumer_task`函数代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def consumer_task(conn):\n",
    "    print('Value [%d] received by PID [%d]' % (conn.recv(), os.getpid()))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "代码的最后一部分就是调用`Pipe()`对象创建两个进程表示生产者和消费者进程。之后，在用把`producer_task`和`consumer_task`函数分别发送到两个进程中。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "if __name__ == '__main__':\n",
    "    producer_conn, consumer_conn = Pipe()\n",
    "    consumer = Process(target=consumer_task, args=(consumer_conn,))\n",
    "    producer = Process(target=producer_task, args=(producer_conn,))\n",
    "\n",
    "    consumer.start()\n",
    "    producer.start()\n",
    "\n",
    "    consumer.join()\n",
    "    producer.join()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "进程定义之后，再使用`start()`方法初始化执行，然后用`join()`方法使得主函数进程等待生产者和消费者进程执行完毕后再结束。\n",
    "\n",
    "运行`multiprocessing_pipe.py`程序，结果如下图所示：\n",
    "![](http://muxuezi.github.io/posts/ppp/ch5/multiprocessing_pipe.png)\n",
    "[源代码](http://muxuezi.github.io/posts/ppp/ch5/multiprocessing_pipe.py)\n",
    "\n",
    "### 理解`multiprocessing.Queue`\n",
    "上一节我们分析了管道的概念，通过创建通道来建立进程间的通信。现在，我们将分析如何高效地实现这种通信，这就要用到`multiprocessing`中的`Queue`对象。`multiprocessing.Queue`的接口与`queue.Queue`非常相似。但是，`multiprocessing.Queue`的内部实现机制不一样，有一个内部线程叫供给线程(feeder thread)，通过把队列的数据缓存传输数据到目标进程相关的管道中。`Pipe`和`Queue`对象机制都是有消息传递范式，用户不用考虑同步机制。\n",
    "\n",
    ">虽然使用`multiprocessing.Queue`不用考虑同步机制，比如`Locks`互斥锁，但是这些机制在进程内部的缓存和管道之间进行通信的时候还是会用到。\n",
    "\n",
    "## 用`multiprocessing`实现多请求的Fibonacci数列\n",
    "让我们用进程来替代线程实现多请求的Fibonacci数列。\n",
    "\n",
    "`multiprocessing_fibonacci.py`代码用了`multiprocessing`模块，首先我们导入必要的模块，代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import sys\n",
    "import logging\n",
    "import time\n",
    "import os\n",
    "import random\n",
    "from multiprocessing import Process, Queue, Pool, cpu_count, current_process, Manager"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "这里导入的模块很多在上一章都用过，但是有几个模块需要说明一下：\n",
    "- `cpu_count`：这个函数获取电脑的CPU的数量\n",
    "- `current_process`：这个函数可以获取当前进程的信息，比如进程名称\n",
    "- [`Manager`](https://docs.python.org/3/library/multiprocessing.html)：这个对象提供代理的形式在不同的进程之间共享Python对象。\n",
    "\n",
    "后面的代码，我们会看到第一个函数有点不一样：它是随机从1到20的整数中随机抽取15个数。这些值将作为键插入到`fibo_dict`，一个`Manager`对象生成的字典。\n",
    "\n",
    ">虽然消息传递的方式更常用。但是，有时候，我们需要在不同进程之间共享一些数据，就像我们在`fibo_dict`字典里看到的一样。\n",
    "\n",
    "`producer_task`函数代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def producer_task(q, fibo_dict):\n",
    "    for i in range(15):\n",
    "        value = random.randint(1, 20)\n",
    "        fibo_dict[value] = None\n",
    "        logger.info(\"Producer [%s] putting value [%d] into queue.. \"\n",
    "                    % (current_process().name, value))\n",
    "        q.put(value)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "然后我们定义一个函数为`fibo_dict`中的每个键计算Fibonacci数列。和上一章线程函数最明显的不同就是这里把`fibo_dict`作为函数的参数使用。\n",
    "\n",
    "`consumer_task`函数代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def consumer_task(q, fibo_dict):\n",
    "    while not q.empty():\n",
    "        value = q.get(True, 0.05)\n",
    "        a, b = 0, 1\n",
    "        for item in range(value):\n",
    "            a, b = b, a + b\n",
    "            fibo_dict[value] = a\n",
    "        logger.info(\"consumer [%s] getting value [%d] from queue...\"\n",
    "                    % (current_process().name, value))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "继续看后面的代码，我们进入了主函数部分。这里定义了几个变量：\n",
    "- `data_queue`：根据Python的标准可以安全处理进程的`multiprocessing.Queue`\n",
    "- `number_of_cpus`：就是前面用过的`multiprocessing.cpu_count`函数\n",
    "- `fibo_dict`：`Manager`对象生成的字典，进程的最终结果会插入到里面"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "if __name__ == '__main__':\n",
    "    data_queue = Queue()\n",
    "    number_of_cpus = cpu_count()\n",
    "    manager = Manager()\n",
    "    fibo_dict = manager.dict()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "然后，我们新建一个`producer`对象，用`producer_task`函数处理`data_queue`中的随机数，代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "    producer = Process(target=producer_task, args=(data_queue, fibo_dict))\n",
    "    producer.start()\n",
    "    producer.join()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们会看到`Process`类初始化的签名与`threading`模块的`Thread`类的签名一样。它会接受由worker进程并行地执行目标函数，和函数对应的参数。然后，我们开启进程并调用`join()`函数，让主进程在`producer`进程执行完毕后再结束。\n",
    "\n",
    "后面的代码是定义一个`consumer_list`，里面保存一个已经初始化的消费者进程列表。创建这个列表的目的是为了在所有的worker进程启动后对这列进程调用`join()`函数。如果直接在循环体中对每个项目调用`join()`函数，那么可能只有第一个worker会执行任务，因为下一个迭代会被阻塞等待这个worker进程运行完毕，最终可能下一个worker进程永远不会执行。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "    consumer_list = []\n",
    "    for i in range(number_of_cpus):\n",
    "        consumer = Process(target=consumer_task, args=(data_queue, fibo_dict))\n",
    "        consumer.start()\n",
    "        consumer_list.append(consumer)\n",
    "\n",
    "    [consumer.join() for consumer in consumer_list]\n",
    "\n",
    "    logger.info(fibo_dict)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "运行代码，`fibo_dict`的最终结果如下图所示：\n",
    "![](http://muxuezi.github.io/posts/ppp/ch5/multiprocessing_fibonacci.png)\n",
    "[源代码](http://muxuezi.github.io/posts/ppp/ch5/multiprocessing_fibonacci.py)\n",
    "\n",
    "## 用`ProcessPoolExecutor`实现并行网络爬虫\n",
    "就像`concurrent.futures`模块提供了`ThreadPoolExecutor`，也用创建和控制多进程的手段，那就是`ProcessPoolExecutor`类。`ProcessPoolExecutor`类，也是在`concurrent.futures`里面，将被用来实现我们并行网络爬虫。示例代码在`process_pool_executor_web_crawler.py`里面。\n",
    "\n",
    "这个代码导入的模块前面的例子类似，比如`requests`和`Manager`模块。与上一章多线程爬虫的例子些许不同是，我们用函数参数的形式发送任务数据；具体函数签名如下所示：\n",
    "\n",
    "`group_urls_task`函数定义如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def group_urls_task(urls, result_dict, html_link_regex)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`crawl_task`函数定义如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def crawl_task(url, html_link_regex)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "然我们再看一点儿有差异的代码。在主函数里面，我们定义了一个`Manager`类型，这样就可以共享队列，而不只是字段里的进程处理结果了。我们使用`Manager.Queue`对象来定义一个队列`urls`，存放需要被抓取的URL链接。`result_dict`字典，我们将用`Manager.dict()`对象，目的是要通过代理来管理字典。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "if __name__ == '__main__':\n",
    "    manager = Manager()\n",
    "    urls = manager.Queue()\n",
    "    urls.put('http://www.sina.com')\n",
    "    urls.put('http://cn.bing.com/')\n",
    "    urls.put('https://coding.net/')\n",
    "    urls.put('http://github.com/')\n",
    "    urls.put('http://mail.126.com/')\n",
    "    result_dict = manager.dict()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "然后，我们定义一个正则表达式来过滤页面中的链接，获取电脑CPU的数量，代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "    html_link_regex = \\\n",
    "        re.compile('<a\\s(?:.*?\\s)*?href=[\\'\"](.*?)[\\'\"].*?>')\n",
    "\n",
    "    number_of_cpus = cpu_count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在最后一部分，我们会看到连续使用`concurrent.futures`模块的API。和我们在上一章里使用的`ThreadPoolExecutor`完全一样。但是，`ProcessPoolExecutor`通过内部机制的改变避开了GIL对CPU的限制，而且也不需要对代码做过多的调整。两个`ProcessPoolExecutor`的worker进程数量都等于电脑CPU的数量。第一个`ProcessPoolExecutor`把URL任务并行插入字典`result_dict`，并把对应的字典值设置为`None`。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "    with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as group_link_processes:\n",
    "        for i in range(urls.qsize()):\n",
    "            group_link_processes.submit(group_urls_task, urls, result_dict, html_link_regex)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "第二个`ProcessPoolExecutor`执行网页链接抓取。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "    with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as crawler_link_processes:\n",
    "        future_tasks = {crawler_link_processes.submit(crawl_task, url, html_link_regex): url for url in result_dict.keys()}\n",
    "        for future in concurrent.futures.as_completed(future_tasks):\n",
    "            result_dict[future.result()[0]] = future.result()[1]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    ">用`concurrent.futures`实现多线程范式到多进程会更简单。\n",
    "\n",
    "运行代码，结果如下图所示：\n",
    "![](http://muxuezi.github.io/posts/ppp/ch5/process_pool_executor_web_crawler.png)\n",
    "[源代码](http://muxuezi.github.io/posts/ppp/ch5/process_pool_executor_web_crawler.py)\n",
    "\n",
    "## 本章小结\n",
    "在这一章，我们介绍了进程的概念，并用多进程方法实现多请求的Fibonacci数列和并行网络爬虫。\n",
    "\n",
    "下一章，我们将用著名的*parallel python*模块研究多进程问题，这个模块不是Python自带的模块。我们将介绍进程间通信的概念，以及如果用管道实现进程间通信。"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
